<!--  -->

<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN">
<html>
<head>
  <title>Stock Distributor implementation for the Stock Quoter Publisher/Subscriber Real-time CORBA Service</title>
</head>

<body
 text = "#000000"
 link = "#000fff"
 vLink= "#ff0f0f"
 aLink = "#0000ff"
 bgColor = "#ffffff">

<hr><h2>Stock Distributor implementation for the Stock Quoter Publisher/Subscriber Real-time CORBA Service</h2><hr>

<h3>Implementation of StockQuoter interface</h3>

This interface is used by brokers to get detailed stock information. <P>

The Stock_StockQuoter_i class is generated automatically by the IDL compiler (using the -GI flag),
which is a subclass of POA_Stock::StockQuoter class. <P>

<h4>Implementing the get_stock_info () member function</h4>

<li>Call the get_stock_info () function of Stock_Database instance.</li>

<PRE>
    Stock::StockInfo *stock = STOCK_DATABASE->get_stock_info (stock_name);
</PRE>

<hr><h3>Implementation of StockDistributorHome interface</h3>

This interface is used to register the necessary factories and mappings with the specified orb and
create StockDistributor objects. <P>

The Stock_StockDistributorHome_i class is generated automatically by the IDL compiler (using the -GI flag),
which is a subclass of POA_Stock::StockDistributorHome class. <P>

This class is also a subclass of ACE_Event_Handler, which can be used as an event handler. <P>

<h4>Implementing the Constructor</h4>

The main steps of this function are described as follows: <P>

<li>Register this class as an event handler with the ORB to catch ctrl-c from the command line. </li>

<PRE>
    if (orb_->orb_core ()->reactor ()->register_handler (SIGINT, this) == -1)
      ACE_DEBUG ((LM_DEBUG, "ERROR: Failed to register as a signal handler: %p\n",
                  "register_handler\n"));
</PRE>

<li>Register the necessary factories and mappings with the specified orb.</li>

<PRE>
    Stock::StockName_init *stockname_factory = new Stock::StockName_init;
    orb->register_value_factory (stockname_factory->tao_repository_id (),
                                 stockname_factory
                                 ACE_ENV_ARG_PARAMETER);

    Stock::Cookie_init *cookie_factory = new Stock::Cookie_init;
    orb->register_value_factory (cookie_factory->tao_repository_id (),
                                 cookie_factory
                                 ACE_ENV_ARG_PARAMETER);

    Stock_PriorityMapping::register_mapping (orb);
</PRE>

<li>Initialize the Stock database.</li>

<PRE>
    STOCK_DATABASE->activate (THR_NEW_LWP | THR_JOINABLE, 1);
</PRE>

<li>Create a CORBA::PolicyList for the child POA.</li>

This step include several sub-steps:
<OL>
<li>    Get a reference to the RTORB.</li>
<li>    Initialize a CORBA::PolicyList object.</li>
<li>    Create a SERVER_DECLARED priority model policy and add it into the former CORBA::PolicyList object.</li>
<li>    Create a threadpool with lanes for the distributor. Since the brokers have various priorities,
create a lane for each priority.</li>
<li>    Create a thread pool policy using the former threadpool and add it into the former CORBA::PolicyList object.</li>
<li>    Create a child POA using the former CORBA::PolicyList and narrow it to RTPOA.</li>
</OL>

<PRE>
    CORBA::Object_var obj = orb_->resolve_initial_references ("RTORB");
    RTCORBA::RTORB_var rt_orb = RTCORBA::RTORB::_narrow (obj.in ());

    TAO::Utils::PolicyList_Destroyer policies (2);
    policies.length (2);

    policies[0] =
      rt_orb->create_priority_model_policy (RTCORBA::SERVER_DECLARED,
                                            Stock::Priority_Mapping::VERY_LOW);

    RTCORBA::ThreadpoolLanes lanes (5); lanes.length (5);

    for (CORBA::ULong i = 0; i < lanes.length (); ++i)
      {
        lanes[i].lane_priority = static_cast<RTCORBA::Priority> (i);
        lanes[i].static_threads = 5;
        lanes[i].dynamic_threads = 0;
      }

    RTCORBA::ThreadpoolId threadpool_id =
      rt_orb->create_threadpool_with_lanes (1024*1024,
                                            lanes,
                                            false, false, 0, 0);

    policies[1] = rt_orb->create_threadpool_policy (threadpool_id);

    PortableServer::POA_var poa = this->_default_POA ();
    PortableServer::POAManager_var poa_mgr = poa->the_POAManager ();

    PortableServer::POA_var child_poa =
      poa->create_POA ("StockDistributor_POA",
                       poa_mgr.in (),
                       policies);

    this->rt_poa_ = RTPortableServer::POA::_narrow (child_poa.in ());
</PRE>

<li>Create the initial distributor reference. </li>

<PRE>
    this->create_distributor ();
</PRE>

<h4>Implementing the create_distributor () member function</h4>

<li>Create a new instance of the StockDistributor_i. Then activate it under the located POA.
This will cause the object to have the CLIENT_PROPAGATED policies. </li>

<PRE>
    StockDistributor_i *servant = new StockDistributor_i (this->rt_poa_.in ());
    PortableServer::ServantBase_var distributor_owner_transfer = servant;
    this->dist_id_ = this->rt_poa_->activate_object (servant);
</PRE>

<h4>Implementing the create () member function</h4>

<li>Get and narrow the StockDistributor object reference and return it. </li>

<PRE>
    obj = this->rt_poa_->id_to_reference (this->dist_id_.in ());
    return Stock::StockDistributor::_narrow (obj.in ());
</PRE>

<h4>Implementing the handle_signal () member function</h4>

<li>Get and shutdown the distributor object and the orb used by it. </li>

<PRE>
    CORBA::Object_var obj (this->rt_poa_->id_to_reference (this->dist_id_.in ()));
    Stock::StockDistributor_var dist (Stock::StockDistributor::_narrow (obj.in ()));
    dist->shutdown ();

    this->orb_->shutdown (true);
</PRE>

<hr><h3>Implementation of StockDistributor interface</h3>

This interface is used for Stock Distributor server. <P>
The Stock_StockDistributor_i class is generated automatically by the IDL compiler (using the -GI flag),
which is a subclass of POA_Stock::StockDistributor class. <P>

<h4>Implementing the Constructor</h4>

<PRE>
    StockDistributor_i::StockDistributor_i (RTPortableServer::POA_ptr poa)
      : rate_ (3), // Default is 3 seconds (3000 milliseconds).
        active_ (false),
        rt_poa_ (RTPortableServer::POA::_duplicate (poa)),
        orb_ (CORBA::ORB::_duplicate (poa->_get_orb ()))
</PRE>

The "rate_" and "active_" are two private members of the Stock_StockDistributor_i class. They stand
for the notification rate and the active state of the StockDistributor object. <P>

<h4>Implementing the subscribe_notifier () member function</h4>

The main steps of this function are described as follows: <P>

<li>Get the write thread mutex of the subscriber map.</li> <P>

<li>Generate an unique id for the cookie and use it to create a new Cookie object.</li> <P>

<li>Insert the StockNameConsumer object and its priority into the StockDistributor object's subscriber map,
using the cookie id as the key.</li> <P>

<li>Return the Cookie object.</li> <P>

<h4>Implementing the unsubscribe_notifier () member function</h4>

The main steps of this function are described as follows: <P>

<li>Check the value of the Cookie object.</li> <P>

<li>Get the write thread mutex of the subscriber map.</li> <P>

<li>Search for the Cookie object in the argument in the StockDistributor object's subscriber map.</li> <P>

<li>Erase the Cookie object from the StockDistributor object's subscribers' map.</li> <P>

<li>Return the StockNameConsumer object which is related to the erased Cookie object.</li> <P>

<h4>Implementing the provide_quoter_info () member function</h4>

<li>Get the read thread mutex of the subscriber map.</li> <P>

<li>Search for the Cookie object in the argument in the StockDistributor object's subscriber map.</li> <P>

<PRE>
    CookieMap::const_iterator iter = this->subscribers_list_.find (ck->cookie_id ());
</PRE>

<li>Create a new instance of the StockQuoter object and activate it under the former RTPOA with the given priority.</li> <P>

<PRE>
    Stock_StockQuoter_i *quoter = new Stock_StockQuoter_i;
    PortableServer::ServantBase_var owner_transfer (quoter);

    PortableServer::ObjectId *oid =
      this->rt_poa_->activate_object_with_priority (quoter,
                                                    iter->second.second);

    CORBA::Object_var obj = rt_poa_->id_to_reference (*oid);
    Stock::StockQuoter_var quoter_var = ::Stock::StockQuoter::_narrow (obj.in ());
</PRE>

<li>Return the StockQuoter object reference.</li> <P>

<h4>Implementing the start () member function</h4>

Set the active state of the StockDistributor object to true and create a thread to publish the stock information
to the Stock Broker clients. <P>

The main steps of this thread function are described as follows:  <P>
While the state of the StockDistributor object is active, continue the following looping:

<li>Get the read thread mutex of the subscriber map.</li>

<li>Publish the stock information to all the StockNameConsumer objects that have subscribed.</li>
<PRE>
    for (Stock_StockDistributor_i::CookieMap::iterator iter = this->subscribers_list_.begin ();
         iter != this->subscribers_list_.end ();
         ++iter)
      {
        try
          {
            // Set the designated priority for current request.
            CORBA::Object_var obj = orb_->resolve_initial_references ("RTCurrent");
            RTCORBA::Current_var rt_current =
              RTCORBA::Current::_narrow (obj);

            rt_current->the_priority (iter->second.second);

            // Tell the database to push its information to the
            // <consumer>, which passes along the CORBA priority
            // in the service_context list of the GIOP message.
            STOCK_DATABASE->publish_stock_info (iter->second.first);
          }
        catch (const CORBA::Exception &ex)
          {
            ACE_PRINT_EXCEPTION (ex, "Stock_StockDistributor_i::svc: ");
          }
      }
</PRE>

<li>Sleep for the specified amount of seconds given in the notification rate.</li>

<h4>Implementing the stop () member function</h4>

<li>Set the active state of StockDistributor object to false.</li>

<h4>Implementing the shutdown () member function</h4>

<li>Stop publishing events.</li>

<PRE>
    this->stop ();
</PRE>

<li>Deactivate the StockDistributor object.</li>

<PRE>
    ::Stock::StockDistributor_var dist = this->_this ();
    PortableServer::ObjectId_var oid = this->rt_poa_->reference_to_id (dist.in ());

    this->rt_poa_->deactivate_object (oid.in ());
</PRE>

<hr><b>Email: </b><a href="mailto:"</a<ADDRESS>shanshan.jiang@vanderbilt.edu</ADDRESS>

</body>

</html>
